package org.databandtech.flinkstreaming;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.databandtech.flinkstreaming.Entity.EpgVod;

public class KafkaTimestampAssigner implements SerializableTimestampAssigner<EpgVod> {

	private static final long serialVersionUID = 6322052560010286410L;

	@Override
	public long extractTimestamp(EpgVod model, long recordTimestamp) {
		Long timestampByEventTime = Long.parseLong(model.getLog_time());
		
		//flink需要13位的时间戳(带毫秒)
		if (timestampByEventTime < 10000000000L) {
			timestampByEventTime = timestampByEventTime * 1000L;
		}	
		return timestampByEventTime;
	}
}
